【Contacts IoT】コンタクトレンズの着脱時間をデータベースに保存する
はじめに
テントの中から失礼します、CX事業本部のてんとタカハシです!
コンタクトレンズ使用者を眼障害から守ることを目的として、コンタクトレンズの着脱管理を行う IoT プロジェクトを進めています。 プロジェクトの概要については、下記の記事をご参照ください。
本プロジェクトに関する記事の一覧は下記のページにまとまっています。
概要
下記の記事で作成したデバイス(コンタクトレンズの着脱検知用ケース)を AWS IoT Core に繋げて、着脱した時間を DynamoDB に登録します。
今回の記事では、バックエンド側の環境・実装が既に作成されていることを前提として、デバイス側の実装を記載していきます。
システム構成図
デバイスから MQTT で AWS IoT Core にリクエストを投げると、Lambda を経由して DynamoDB に着脱時間が記録されます。Lambda の実装では、DynamoDB 上で既に着用開始時間が記録されているレコードが存在するかの確認を行い、既存のレコードを上書きする or 新規でレコードを作成します。
今回の記事では、上記の図で赤く囲った部分の実装 & デバイスを AWS IoT Core に繋ぐために必要なセットアップ部分の実装を記載していきます。
環境
Raspberry Pi 上の環境は下記の通りです。
$ lsb_release -a No LSB modules are available. Distributor ID: Raspbian Description: Raspbian GNU/Linux 10 (buster) Release: 10 Codename: buster $ python3 --version Python 3.7.3 $ pip3 --version pip 18.1 from /usr/lib/python3/dist-packages/pip (python 3.7) $ aws --version aws-cli/1.18.223 Python/3.7.3 Linux/5.10.17-v7+ botocore/1.19.63 $ jq --version jq-1.5-1-a5b5cbe
デバイスのセットアップ
実装
下記を順に実行して、デバイスを AWS IoT Core に接続できるようにします。
- Amazon ルート CA 証明書ファイルをダウンロードする
- Amazon ルート認証局によって署名されたクライアント証明書、鍵を作成する
- クライアント証明書にポリシーを紐付ける
- モノを作成する
- モノにクライアント証明書を紐付ける
- ATS エンドポイントを取得する
- モノの名前と ATS エンドポイントを JSON で吐く
#!/bin/bash set -euo pipefail PROJECT_NAME="contact-iot-device" IOT_POLICY_NAME="${PROJECT_NAME}-iot-policy" IOT_THING_NAME="${PROJECT_NAME}-iot-thing-$(cat /proc/sys/kernel/random/uuid)" CERTIFICATE_DIR="certificates" ROOT_CA_URL="https://www.amazontrust.com/repository/AmazonRootCA1.pem" PROFILE_PATH="${HOME}/.profile" mkdir -p ${CERTIFICATE_DIR} curl -o "${CERTIFICATE_DIR}/root.pem" ${ROOT_CA_URL} > /dev/null 2>&1 CERTIFICATE_ARN=$(aws iot create-keys-and-certificate \ --set-as-active \ --certificate-pem-outfile "${CERTIFICATE_DIR}/certificate.pem.crt" \ --public-key-outfile "${CERTIFICATE_DIR}/public.pem.key" \ --private-key-outfile "${CERTIFICATE_DIR}/private.pem.key" \ --query certificateArn \ --output text \ ) aws iot attach-policy \ --policy-name ${IOT_POLICY_NAME} \ --target ${CERTIFICATE_ARN} aws iot create-thing \ --thing-name ${IOT_THING_NAME} \ > /dev/null 2>&1 aws iot attach-thing-principal \ --thing-name ${IOT_THING_NAME} \ --principal ${CERTIFICATE_ARN} IOT_CORE_ENDPOINT=$(aws iot describe-endpoint --endpoint-type iot:Data-ATS \ --query endpointAddress \ --output text ) JSON=$(cat << EOS { "deviceId": "${IOT_THING_NAME}", "endpoint": "${IOT_CORE_ENDPOINT}" } EOS ) echo "${JSON}"
クライアント証明書に紐付けるポリシーは、contact-iot-device-iot-policy
という名前で事前に作成されていることを前提とします。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iot:*", "Resource": "*" } ] }
実行
セットアップ用のスクリプトを実行します。その後、吐き出される JSON から必要な情報を抜き出して環境変数として設定します。
$ sudo apt-get update $ sudo apt-get install jq $ ./device-setup.sh > device-info.json $ cat device-info.json { "deviceId": "contact-iot-device-iot-thing-xxxxx", "endpoint": "yyyyy-ats.iot.zzzzz.amazonaws.com" } $ echo "export DEVICE_ID=$(cat device-info.json | jq .deviceId)" >> ~/.profile $ echo "export ENDPOINT=$(cat device-info.json | jq .endpoint)" >> ~/.profile $ source ~/.profile
MQTT で AWS IoT Core に繋げる
実装
AWS IoT SDK for Python v2 を使用して、MQTT 接続用モジュールを実装します。
import json from awscrt import io, mqtt from awsiot import mqtt_connection_builder class MqttConnector: def __init__(self, client_id, endpoint, path_to_root, path_to_key, path_to_cert): event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) self.client = mqtt_connection_builder.mtls_from_path( client_bootstrap=client_bootstrap, client_id=client_id, endpoint=endpoint, cert_filepath=path_to_cert, pri_key_filepath=path_to_key, ca_filepath=path_to_root, clean_session=False, keep_alive_secs=6) def __del__(self): disconnect_future = self.client.disconnect() disconnect_future.result() def connect(self): connect_future = self.client.connect() connect_future.result() def publish(self, topic, message): self.client.publish( topic=topic, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
コンタクトレンズの着脱を検知したタイミング(ボタンが押下がされたら)で Publish する処理をmain.py
に追加します。追加したコードの部分が分かりやすいようにハイライト表示しています。
import RPi.GPIO as GPIO import os from time import sleep import wiringpi from mqtt_connector import MqttConnector DEVICE_ID = os.environ['DEVICE_ID'] ENDPOINT = os.environ['ENDPOINT'] TOPIC = 'contact/case/' + DEVICE_ID LED_PIN = 25 BTN_PIN = 24 SERVO_PIN = 18 OPEN = 36 CLOSE = 80 mqtt_connector = MqttConnector( DEVICE_ID, ENDPOINT, 'certificates/root.pem', 'certificates/private.pem.key', 'certificates/certificate.pem.crt') mqtt_connector.connect() on_off = GPIO.LOW def callback(ch): global on_off, mqtt_connector if ch == BTN_PIN: on_off = not on_off GPIO.output(LED_PIN, on_off) if on_off == GPIO.HIGH: wiringpi.pwmWrite(SERVO_PIN, CLOSE) mqtt_connector.publish(TOPIC, {}) else: wiringpi.pwmWrite(SERVO_PIN, OPEN) mqtt_connector.publish(TOPIC, {}) GPIO.setmode(GPIO.BCM) GPIO.setup(LED_PIN, GPIO.OUT, initial=GPIO.LOW) GPIO.setup(BTN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.add_event_detect(BTN_PIN, GPIO.RISING, callback=callback, bouncetime=200) wiringpi.wiringPiSetupGpio() wiringpi.pinMode(SERVO_PIN, wiringpi.GPIO.PWM_OUTPUT) wiringpi.pwmSetMode(wiringpi.GPIO.PWM_MODE_MS) wiringpi.pwmSetClock(375) wiringpi.pwmWrite(SERVO_PIN, OPEN) try: print('start...') while True: sleep(0.01) except KeyboardInterrupt: pass print('end...') del mqtt_connector GPIO.cleanup()
実行
AWS IoT SDK for Python v2
をインストールしてからmain.py
を実行します。
$ sudo pip3 install awsiotsdk $ sudo -E python3 main.py
コンタクトレンズの着脱が検知されたタイミングで、その時刻が DynamoDB に保存されます。
おわりに
デバイスの作成 & 着脱時間の保存まで実装できました。続きはアプリ側の実装です。アプリ側では、コンタクトレンズの着用時間等に応じて通知が送られてきたり、着用データを可視化したりしようと考えています。
ひと通り完成したら、どこかで LT したいなあと考えています。
今回は以上になります。最後まで読んで頂きありがとうございました!